-
-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
WIP Allowing named worker-setups #2391
base: main
Are you sure you want to change the base?
Conversation
1. Convert worker-setups to a dictionary. 2. Coroutines in scheduler takes a callbacks dict as input, where functions are serialized as strings/bytes. 3. When the name already exists, the newly registered setup function will immediately be run on all existing clients, but new clients will only see the updated version of the setup function.
This is a WIP. I am having difficulty running the tests. pytest gives me strange errors:
|
Unfortunately I'm not familiar with that error message. I recommend trying from a clean environment or making sure that you don't have other configuration lying around. I know that this is a WIP, but one thought on quick review is that it would be good to support backwards compatibility for the Client API. In particular we might place the |
92cb17b
to
1862c9e
Compare
If we use an optional name it shall probably be the fully qualified function name to avoid accidental conflicts. In many cases it is impossible to uniquely define a fully qualified function name of the callback function, and therefore callers are forced to provide a name function anyways. Also problematic is the the planned inclusion of other callback functions; when that happens, which function name do we use? |
I'm also against API breakage. Accidental conflicts could only come from several users using the same cluster, in this case it shall be clearly documented to use a name or label for the callback.
I'm not sure I get this, we should record one callback per function, so one function name per callback? |
OK. Let me figure what I can do. |
assert len(s.worker_setups) == 2 | ||
|
||
# Check it has been run | ||
result = yield c.run(test_startup2) | ||
assert list(result.values()) == [True] * 2 | ||
|
||
# unregister a preload function |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed 'register_worker_callbacks' to return the registered names to allow unregisterring callback functions with inferred names.
self.batched_stream = BatchedSend(interval='2ms', loop=self.loop) | ||
self.batched_stream.start(comm) | ||
self.periodic_callbacks['heartbeat'].start() | ||
self.loop.add_callback(self.handle_scheduler, comm) | ||
|
||
# run eventual init functions (only worker-setups for now) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I moved this to run the worker init functions after heartbeat; also I am using the locak worker_setups directionary rather than the unpickled function directly.
distributed/scheduler.py
Outdated
""" Registers a setup function, and call it on every worker """ | ||
if setup is None: | ||
raise gen.Return({}) | ||
def register_worker_callbacks(self, comm, callbacks, names): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I preserved the Client API, but changed the scheduler API in a in compatible way:
This is to avoid
register_worker_callbacks(self, comm, setup, setup_name, teardown, teardown_name):
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the problem, but I'm not entirely convinced by the proposed solution. I've not much to propose though...
Are you expecting that we register more than a setup and teardown callbacks at once in the future?
Could we use tuples instead of dict containing several callbacks? e.g stick to
register_worker_callbacks(self, comm, setup=None, teardown=None):
But where we expect setup
or teardown
to be (name, function)
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a tuple would work. We can actually extend this all the way to the worker: If a tuple is given then the first argument is name. If not a tuple then we generated a name from the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will rework the PR along this line.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for not responding to this earlier. I personally prefer asking for the name=
keyword explicitly rather than passing it through as a tuple.
client.register_worker_callbacks(setup=setup_database, teardown=teardown_database, name='database')
I personally find passing tuples somewhat awkward.
@guillaumeeb what was your concern with the name=
keyword solution? That is it hard to provide ordered parameter compatibility going forward?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I liked the tuple because:
- each callback needs a different name.
- it was decided
register_worker_callbacks
can register multiple callbacks in one call. - there is no clean way to assign names to callbacks with a single name argument and multiple callbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My concern was not about the name=
keyword solution, but about the callbacks=dict
solution, which I find kind of obscure even if documented. The tuple may not be better though.
I think I prefer explicit register_worker_callbacks(self, comm, setup, setup_name, teardown, teardown_name)
as the one with only a dict of callbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There wouldn't be a problem if the function was instead:
register_worker_callback(self, comm, callback_type, function , name)
I doubt there is need to make it atomic registering several callbacks.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here is another possibility.
register_worker_callbacks(self, comm, setup, name=None)
If setup
takes two arguments setup(worker, event)
, then we will call it as
setup(worker, 'setup')
setup(worker, 'teardown')
.
If setup
takes one or zero argument, we only call it during setup.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe when we add teardowns we decide that we need classes instead?
So today
def register_worker_callbacks(self, comm, setup, name=None):
Where setup is a callable.
In the future
def register_worker_callbacks(self, comm, cb, name=None):
Where cb is either a callable, in which case it is interpretted as a setup function, or it is a class with the following structure:
class MyWorkerCallback:
def __init__(self, worker):
...
def setup(self):
...
def tearDown(self):
...
...
In this way we continue to maintain the same structure today, provide a path towards supporting teardowns in the future, but don't have to rely on implicit tuple semantics.
distributed/client.py
Outdated
callbacks['setup'] = serialized | ||
if name is None: | ||
h = md5(serialized) | ||
name = funcname(setup) + '-' + h.hexdigest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This achieves the hashing based on function "body" @mrocklin mentioned.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a little overkill to me to do a hash of serialized function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Speed is not a concern here since it is only intended to be used at initialization.
The hash produces two names for lib1.setup and lib2.setup; even when a name is not explicitly given. I would preferred __qualname__
, but it is only defined in Python 3.3+.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend using dask.base.tokenize
generally for hashing objects. We tend to centralize our logic there.
Travis passed. (The failure is unrelated). This is ready for a review. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Generally it looks good, a few minor comment/interrogations. I'm not convinced by the callbacks
kwarg.
distributed/client.py
Outdated
callbacks['setup'] = serialized | ||
if name is None: | ||
h = md5(serialized) | ||
name = funcname(setup) + '-' + h.hexdigest() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems a little overkill to me to do a hash of serialized function.
distributed/client.py
Outdated
raise gen.Return(result) | ||
|
||
def unregister_worker_callbacks(self, setup=None): | ||
""" """ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing docstring here.
Are we sure we want an unregistering function? What about workers where the callback has already been called?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I had the same doubt too!
Then I realized this is useful if one then call client.restart() to restart all workers.
distributed/scheduler.py
Outdated
""" Registers a setup function, and call it on every worker """ | ||
if setup is None: | ||
raise gen.Return({}) | ||
def register_worker_callbacks(self, comm, callbacks, names): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I understand the problem, but I'm not entirely convinced by the proposed solution. I've not much to propose though...
Are you expecting that we register more than a setup and teardown callbacks at once in the future?
Could we use tuples instead of dict containing several callbacks? e.g stick to
register_worker_callbacks(self, comm, setup=None, teardown=None):
But where we expect setup
or teardown
to be (name, function)
?
if setup = func is given, generate a name.
There is no longer a guarentee the setup function is always ran together with the register call: If the same function is registered twice it will not run the second time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some small style suggestions. I haven't looked at the full design yet though.
distributed/client.py
Outdated
@@ -22,6 +22,7 @@ | |||
import socket | |||
import warnings | |||
import weakref | |||
from hashlib import md5 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We tend to use dask.base.tokenize
for this (though what you've done here is fine)
distributed/client.py
Outdated
The setup callback to remove; it will be matched by the name. | ||
If a name is not given, then it is generated from the callable. | ||
""" | ||
return self.sync(self._unregister_worker_callbacks, setup=setup) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These two are simple enough that you can merge them. See the definition of Client.cancel
is a good example. You can use self.sync(self.scheudler.unregister_worker_callbacks, ...)
directly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Client.cancel
still goes through Client._cancel
. I had the impression it was convention to always have two interfaces of the same functionality, the coroutine version and the non-coroutine version.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, you're right. Perhaps list_datasets
then.
We need to create a coroutine if there is a yield
somewhere. If not, then we're happy to skip it for simplicity.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks. I changed it.
""" Registers a set of event driven callback functions on workers for the given name. | ||
|
||
setup must be a tuple of (name, serialized_function) | ||
""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For spacing I recommend the following:
"""
Registers a set of event driven callback functions on workers for the given name # <<--- move this down to avoid the slightly longer line
setup must be a tuple of (name, serialized_function) # <<<--- move this left to the same level as """
"""
distributed/scheduler.py
Outdated
name, func = setup | ||
self.worker_setups.pop(name) | ||
|
||
raise gen.Return(None) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This doesn't need to be a coroutine. I recommend removing the @gen.coroutine
line and replacing raise gen.Return(...)
with return ...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be clear, the reason here is that this method never yield
s, so it doesn't need to be a coroutine
I apologize for the delay in review here. I've been travelling for work a bit more than is ideal. |
Is there a way to
The test case can be hugely simplified if I can do this. |
The client.run method magically works as a coroutine if in a coroutine. So you should be able to do the following:
|
It won't work if I put yield client.run inside a nested function defined in the test case. |
Convert worker-setups to a dictionary.
Coroutines in scheduler takes a callbacks dict as input,
where functions are serialized as strings/bytes.
When the name already exists,
the newly registered setup function will immediately be run
on all existing clients, but new clients will only see
the updated version of the setup function.